/*
 * CTcpMediaCaster.cpp
 *
 *  Created on: 2016年5月13日
 *      Author: terry
 */

#include "CTcpMediaCaster.h"
#include "CLog.h"
#include "ClockTime.h"
#include <limits>
#include <algorithm>
#include <errno.h>
#include "RtpHeader.h"
#include "TFileUtil.h"
#include "TStringUtil.h"
#include "RtpPort.h"

#include "CRtpPackager.h"
#include "H264RtpPackager.h"
#include "H265RtpPackager.h"

#include <functional>
#include <errno.h>
#include "AppConst.h"
#include "RtpConst.h"


static const size_t THREAD_STACK_SIZE = 2 * 1024 * 1024;



namespace av
{

CTcpMediaCaster::PrefixMode CTcpMediaCaster::PREFIX_MODE = CTcpMediaCaster::kDword;
bool CTcpMediaCaster::RECONNECT_ENABLED = true;


CTcpMediaCaster::CTcpMediaCaster():
		m_packetCount(),
		m_lastTime(),
		m_packager(new CRtpPackager(MEDIA_TYPE_DATA, MEDIA_CODEC_NONE)),
		m_seq(),
		m_ssrc(),
		m_payload(),
		m_initTime(0),
		m_id(),
		m_audioPackager(new CRtpPackager(MEDIA_TYPE_AUDIO, MEDIA_CODEC_NONE)),
		m_audioPayload(),
		m_audioSeq(),
        m_tcpSink()
{
	setStackSize(THREAD_STACK_SIZE);
}

CTcpMediaCaster::~CTcpMediaCaster()
{
	close();
}

int CTcpMediaCaster::open(const std::string& ip, uint16_t port, int codec, int clockRate, int payload, int ssrc)
{
	if (isOpen())
	{
		close();
	}

    m_addr.set(ip, port);
    m_payload = payload;
    m_ssrc = ssrc;

    m_packager.reset(createPackager(codec));
    m_packager->setPayload(m_payload);

    start();

    return 0;
}

void CTcpMediaCaster::close()
{
	stop();

	removeAllTarget();

	clearCache();
}

bool CTcpMediaCaster::isOpen()
{
	return isRunning();
}

void CTcpMediaCaster::setID(int id)
{
	m_id = id;
}

int CTcpMediaCaster::getID()
{
	return m_id;
}

bool CTcpMediaCaster::addTarget(const std::string& ip, uint16_t port)
{
	NetAddress addr(ip, port);

	bool done = false;
	comn::AutoCritSec lock(m_cs);
	RtpTargetArray::iterator it = std::find(m_targets.begin(), m_targets.end(), addr);
	if (it == m_targets.end())
	{
		m_targets.push_back(addr);

		TcpClientPtr client(new TcpClient());
        client->enableReconnect(RECONNECT_ENABLED);
		client->setSendBuffer(AppConst::SEND_BUFFER_SIZE);
        client->setSink(m_tcpSink);
		client->connect(ip, port);

		m_tcpClients.push_back(client);

		done = true;
	}

	return done;
}

struct TcpClientPredicate : public std::unary_function< TcpClientPtr, bool >
{
	std::string m_ip;
	uint16_t m_port;

	TcpClientPredicate(const std::string& ip, uint16_t port):
		m_ip(ip),
		m_port(port)
	{
	}

	bool operator ()(const TcpClientPtr& client) const
	{
		return client->equals(m_ip, m_port);
	}
};

void CTcpMediaCaster::removeTarget(const std::string& ip, uint16_t port)
{
	NetAddress addr(ip, port);

    comn::AutoCritSec lock(m_cs);
    RtpTargetArray::iterator it = std::remove(m_targets.begin(), m_targets.end(), addr);
    if (it != m_targets.end())
    {
        m_targets.erase(it);

        TcpClientPredicate pred(ip, port);
        TcpClientArray::iterator it = std::find_if(m_tcpClients.begin(), m_tcpClients.end(), pred);
        if (it != m_tcpClients.end())
        {
        	m_tcpClients.erase(it);
        }
    }

}

size_t CTcpMediaCaster::getTargetCount()
{
    comn::AutoCritSec lock(m_cs);
    return m_targets.size();
}

void CTcpMediaCaster::removeAllTarget()
{
    comn::AutoCritSec lock(m_cs);
	m_targets.clear();

	m_tcpClients.clear();
}

void CTcpMediaCaster::onMediaFormat(const MediaFormat& fmt)
{
	m_audioPackager.reset(createPackager(fmt.m_audioCodec));

	if (fmt.m_audioCodec == MEDIA_CODEC_G711A)
	{
		m_audioPayload = 8;
	}
	else if (fmt.m_audioCodec == MEDIA_CODEC_G711U)
	{
		m_audioPayload = 0;
	}
	else
	{
		m_audioPayload = m_payload + 1;
	}

	m_audioPackager->setPayload(m_audioPayload);
}

void CTcpMediaCaster::onMediaPacket(MediaPacketPtr& pkt)
{
    if (pkt->empty())
    {
        return;
    }

    m_pktQueue.push(pkt);
}

void CTcpMediaCaster::onMediaEvent(int event)
{
	// pass
}


int CTcpMediaCaster::run()
{
	while (!m_canExit)
	{
		MediaPacketPtr pkt;
		if (!m_pktQueue.pop(pkt, 1000))
		{
			continue;
		}

		sendPacket(pkt);
	}
	return 0;
}

void CTcpMediaCaster::doStop()
{
    m_pktQueue.cancelWait();
}

void CTcpMediaCaster::onSlicePacket(const RtpPacket& pkt)
{
	assemble(pkt);
}

RtpPackager* CTcpMediaCaster::createPackager(int codec)
{
	if (codec == MEDIA_CODEC_H264)
	{
		return new H264RtpPackager();
	}
	else if (codec == MEDIA_CODEC_HEVC)
	{
		return new H265RtpPackager();
	}
	else if (codec == MEDIA_CODEC_PS)
	{
		return new CRtpPackager(MEDIA_TYPE_DATA, MEDIA_CODEC_PS);
	}
	else if ((codec == MEDIA_CODEC_G711A) || (codec == MEDIA_CODEC_G711U))
	{
		return new CRtpPackager(MEDIA_TYPE_AUDIO, (MediaCodec)codec);
	}
	return new CRtpPackager(MEDIA_TYPE_DATA, (MediaCodec)codec);
}


void CTcpMediaCaster::clearCache()
{
    m_pktQueue.clear();
}

void CTcpMediaCaster::sendPacket(MediaPacketPtr& pkt)
{
	MediaPacket& packet = *pkt;

	m_buffer.reset(new comn::ByteBuffer());

	int count = pkt->size / av::RtpConst::RTP_PACKET_SIZE + 1;
	int length = pkt->size + count * (sizeof(RTP_FIXED_HEADER) * 2);
	m_buffer->ensure(length);

	if (packet.type == MEDIA_TYPE_AUDIO)
	{
		m_audioPackager->slice(packet, av::RtpConst::RTP_PACKET_SIZE, this);
	}
	else
	{
		m_packager->slice(packet, av::RtpConst::RTP_PACKET_SIZE, this);
	}

	sendBuffer(m_buffer);
}

void CTcpMediaCaster::assemble(const RtpPacket& pkt)
{
	m_buffer->expect(pkt.size + sizeof(RTP_FIXED_HEADER) + sizeof(int32_t));

	RTP_FIXED_HEADER header;
	memset(&header, 0, sizeof(header));
	header.version = 2;
	header.marker = pkt.mark;

	header.timestamp = htonl(m_initTime + pkt.ts);

	header.ssrc = htonl(m_ssrc);

	if (pkt.pt == m_payload)
	{
		header.seq_no = htons(m_seq++);
	}
	else
	{
		header.seq_no = htons(m_audioSeq++);
	}
	
	header.payload = pkt.pt;

    if (PREFIX_MODE == kDword)
    {
        int32_t length = pkt.size + sizeof(RTP_FIXED_HEADER);
        length = htonl(length);
        m_buffer->write((char*)&length, sizeof(length));
    }
    else
    {
        int16_t length = pkt.size + sizeof(RTP_FIXED_HEADER);
        length = htons(length);
        m_buffer->write((char*)&length, sizeof(length));
    }

	m_buffer->write((char*)&header, sizeof(header));
	m_buffer->write(pkt.data, pkt.size);
}

void CTcpMediaCaster::sendBuffer(ByteBufferPtr& buffer)
{
	for (size_t i = 0; i < m_tcpClients.size(); i ++)
	{
		m_tcpClients[i]->write(buffer);
	}
}

//void CTcpMediaCaster::onTcpEvent(void* obj, int event)
//{
//    
//}

void CTcpMediaCaster::setSink(TcpClient::Sink* sink)
{
    m_tcpSink = sink;
}

} /* namespace av */
